package io.mqttpush.mqttclient;

import io.mqttpush.mqttclient.pack.MqttPackExtEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.mqtt.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

/**
 * 
 * @author tianzhenjiu
 *
 */
public class Connectioner {

	
	Logger logger=LoggerFactory.getLogger(getClass());
	
	
	String host ="120.78.149.117";
	Integer port = 8018;
	Integer pingTime=59;
	String deviceId;
	String username;
	String password;
	
	NioEventLoopGroup eventLoopGroup=new NioEventLoopGroup(2);

	CancelbleExecutorService executorService = new CancelbleExecutorService(2);
	
	ConnectionHandle connectionHandle;
	
	Channel channel;
	

	Bootstrap bootstrap = new Bootstrap();
	
	
	public void initChannel(MessageListener messageListener,Consumer<Channel>  successlogin) {
		
	
		connectionHandle=new ConnectionHandle(deviceId,
				username, password, pingTime,
				executorService, 
				this::reconnection,successlogin);
		
		SubHandle subHandle=new SubHandle();
		PubHandle pubHandle=new PubHandle(messageListener);
		bootstrap.group(eventLoopGroup).
		channel(NioSocketChannel.class).
		option(ChannelOption.TCP_NODELAY, true).
		handler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel ch) throws Exception {

			    ch.pipeline()
			    .addLast(MqttEncoder.INSTANCE)
			    .addLast(new MqttDecoder())
			    .addLast(new MqttPackExtEncoder())
				.addLast(connectionHandle).addLast(subHandle).addLast(pubHandle);
			}
			
			
		});
	}
	/**
	 * 连接
	 * 
	 * @return
	 */
	public ChannelFuture connection() {
		
		AtomicBoolean hasConnect=Status.hasConnect;
		
		ChannelFuture channelFuture = bootstrap.connect(host, port);

		channelFuture.addListener((ChannelFuture future) -> {

			if(!future.isSuccess()) {
				/**
				 * 连接失败，重新连接
				 */
				if(hasConnect.get()) {
					logger.info("已经连接,无需重复连接");
					return;
				}
				executorService.schedule( ()->{
					reconnection(future.channel());
				},3, TimeUnit.SECONDS);
				logger.warn("连接失败"+future.cause());
			}else {
				this.channel=channelFuture.channel();
			}
		});
		
		return channelFuture;
	}
	/**
	 * 重连接
	 * 
	 * @return
	 */
	public ChannelFuture reconnection(Channel channel) {

	
		
		AtomicBoolean hasConnect=Status.hasConnect;
		/**
		 * 清理调调度线程池
		 */
		executorService.reset();
		hasConnect.set(false);
		
		if (channel==null||!channel.isActive()) {
			return connection();
		}

		return  channel.close();
	}
	
	
	
	/**
	 * 发布
	 * @param topname
	 * @param bs
	 * @param qoS
	 * @return
	 */
	public ChannelFuture pubMsg(String topname, byte[] bs, MqttQoS qoS) {
		
		AtomicBoolean isLogin=Status.isLogin;
		
		if(channel==null) {
			logger.info("channel未连接");
			return null;
		}
		
		
		if(!isLogin.get()){
			logger.info("未登录");
			return channel.newFailedFuture(new RuntimeException("未登录"));
		}
		
	
		
			
		ByteBuf byteBuf=Unpooled.wrappedBuffer(bs);
		
		int messageid=Math.abs(Integer.valueOf(bs.hashCode()).shortValue());
		MqttFixedHeader mqttFixedHeader=
				new MqttFixedHeader(MqttMessageType.PUBLISH,false, qoS,true , 0);
 		
 		MqttPublishVariableHeader variableHeader=
 				new MqttPublishVariableHeader(topname,messageid);
 		
 		MqttPublishMessage mqttPublishMessage=
 				new MqttPublishMessage(mqttFixedHeader, variableHeader, byteBuf);
 		return channel.writeAndFlush(mqttPublishMessage);
	}
	
	/**
	 * 订阅
	 * @param topname
	 * @param qoS
	 * @return
	 */
	public ChannelFuture subscribe(String topname, MqttQoS qoS) {
		
		AtomicBoolean isLogin=Status.isLogin;
		
		if(channel==null) {
			logger.info("channel未连接");
			return null;
		}
		
		if(!isLogin.get()){
			logger.info("未登录");
			return channel.newFailedFuture(new RuntimeException("未登录"));
		}
		
		List<MqttTopicSubscription> topicSubscriptions=new ArrayList<>();
		topicSubscriptions.add(new MqttTopicSubscription(topname, qoS));
		
		
		MqttFixedHeader mqttFixedHeader=
				new MqttFixedHeader(MqttMessageType.SUBSCRIBE,false, MqttQoS.AT_LEAST_ONCE,false , 0);
		
		MqttMessageIdVariableHeader variableHeader=
				MqttMessageIdVariableHeader.from(Math.abs(Integer.valueOf(topname.hashCode()).shortValue()));
		MqttSubscribePayload payload=new MqttSubscribePayload(topicSubscriptions);
		MqttSubscribeMessage mqttSubscribeMessage=new MqttSubscribeMessage(mqttFixedHeader, variableHeader, payload);
		
		return channel.writeAndFlush(mqttSubscribeMessage);
	}
	
	
	public String getHost() {
		return host;
	}
	public Integer getPort() {
		return port;
	}
	public void setHost(String host) {
		this.host = host;
	}
	public void setPort(Integer port) {
		this.port = port;
	}
	public Integer getPingTime() {
		return pingTime;
	}
	public String getDeviceId() {
		return deviceId;
	}
	public String getUsername() {
		return username;
	}
	public String getPassword() {
		return password;
	}
	public void setPingTime(Integer pingTime) {
		this.pingTime = pingTime;
	}
	public void setDeviceId(String deviceId) {
		this.deviceId = deviceId;
	}
	public void setUsername(String username) {
		this.username = username;
	}
	public void setPassword(String password) {
		this.password = password;
	}
	
	
	
	
}
